Last Updated: November 13, 2025
Status: Phase 2 (Dual-Write) - Ready for full SQL migration
Objective: Complete transition from file-based to SQL-based storage for all outputs, models, and configurations
The ACM system is fully equipped for SQL-based operation. All infrastructure is in place:
Current State: Database schema complete, code supports dual-write, ZERO data in tables (fresh start)
Next Action: Enable dual-write mode and populate SQL database alongside file outputs for validation
BASE TABLES:
Equipment -- Asset master data (0 rows - ready for population)
Runs -- Pipeline execution tracking (0 rows)
ModelRegistry -- Trained model storage (0 rows)
ConfigLog -- Configuration change audit trail (0 rows)
Historian -- Raw time-series cache (0 rows)
TIME-SERIES OUTPUTS:
ScoresTS -- Detector scores (fused_z, ar1_z, pca_spe_z, etc.)
DriftTS -- Multi-feature drift signals
PCA_ScoresTS -- PCA T² and SPE scores
ForecastResidualsTS -- AR1 residual tracking
DataQualityTS -- Data quality metrics over time
ANALYTICS TABLES:
AnomalyEvents -- Episode detection results
RegimeEpisodes -- Operating regime periods
AnomalyTopSpikes -- Top contributing sensors per episode
XCorrTopPairs -- Sensor correlation rankings
FeatureImportance -- Drift culprit analysis
DriftSummary -- Drift change point summary
CPD_Points -- Change point detection results
RunStats -- Run-level quality metrics
MODEL PERSISTENCE:
PCA_Model -- PCA model parameters
PCA_Components -- PCA loadings/components
PCA_Metrics -- PCA quality metrics
v_Equip_Anomalies -- Equipment anomaly summary
v_Equip_DriftTS -- Equipment drift timeline
v_Equip_SensorTS -- Equipment sensor time-series
v_PCA_Loadings -- PCA component interpretation
v_PCA_Scree -- PCA variance explained plot data
CORE LIFECYCLE:
usp_ACM_StartRun -- Initialize pipeline run
usp_ACM_FinalizeRun -- Complete pipeline run
DATA WRITES:
usp_Write_ScoresTS -- Batch insert detector scores
usp_Write_DriftTS -- Batch insert drift signals
usp_Write_AnomalyEvents -- Write episode detections
usp_Write_RegimeEpisodes -- Write regime transitions
usp_Write_AnomalyTopSpikes -- Write culprit sensors
usp_Write_XCorrTopPairs -- Write correlation pairs
usp_Write_FeatureImportance -- Write drift culprits
usp_Write_DriftSummary -- Write drift summary
usp_Write_CPD_Points -- Write change points
usp_Write_DataQualityTS -- Write quality metrics
usp_Write_ForecastResidualsTS -- Write forecast residuals
usp_Write_ConfigLog -- Write config changes
usp_Write_RunStats -- Write run statistics
PCA MODEL WRITES:
usp_Write_PCA_Model -- Persist PCA model
usp_Write_PCA_Metrics -- Write PCA quality metrics
usp_Write_PCA_Loadings -- Write PCA components
usp_Write_PCA_ScoresTS -- Write PCA scores
File: configs/sql_connection.ini (local, gitignored)
localhost\B19CL3PCQLSERVERACMFile: core/sql_client.py
SQLClient.from_ini(db_section) - Load connection configTrusted_Connection support (Windows Auth)File: core/output_manager.py (Lines 1-4547)
write_table() method with automatic SQL fallbackFile: core/acm_main.py
SQL_MODE detection from configdual_mode flag support (line 658)File: core/sql_performance.py
SQLBatchWriter - Optimized bulk insertsSQLPerformanceMonitor - Performance trackingfast_executemany enabledFile: core/model_persistence.py
ModelVersionManager - Model versioning systemModelRegistry table exists with:
ModelType (varchar) - ar1, pca, iforest, gmm, regimesEquipID (int) - Equipment foreign keyVersion (int) - Model version numberParamsJSON (nvarchar) - Serialized model parametersStatsJSON (nvarchar) - Model quality metricsRunID (uniqueidentifier) - Link to training runEntryDateTime (datetime2) - Creation timestampFile: utils/sql_config.py
ConfigLog tableDatabase:
scripts/sql/40_seed_config.sqlConfigLog table tracks all config changesFile: scripts/sql/25_equipment_discovery_procs.sql
Status: Done (November 13, 2025)
ACMEvidence:
# Connection test passed
python scripts\sql\verify_acm_connection.py
# Output: CONNECTED server=B19cl3pc\B19CL3PCQLSERVER db=ACM
Objective: Run pipeline in dual-write mode, validate SQL outputs against file outputs
Duration: 1-2 weeks
Risk: Low (file output preserved as fallback)
Data Source: CSV files (existing)
Output: Files + SQL (both)
Step 1.1: Enable Dual-Write Mode ⏳
# Update config.yaml or set via command line
# Add to config.yaml:
output:
dual_mode: true
backend: file # Primary is still file
# Or use environment variable
$env:ACM_DUAL_MODE = "true"
Step 1.2: Register Equipment ⏳
-- Add equipment to database
INSERT INTO Equipment (EquipCode, EquipName, Area, Unit, Status, CommissionDate)
VALUES
('FD_FAN', 'Forced Draft Fan', 'Boiler', 'Unit 1', 1, '2024-01-01'),
('GAS_TURBINE', 'Gas Turbine GT-101', 'Power Generation', 'Unit 1', 1, '2024-01-01');
Step 1.3: Run Pipeline with Dual-Write ⏳
cd "c:\Users\bhadk\Documents\ACM V8 SQL\ACM"
# Run FD_FAN with dual-write enabled
python -m core.acm_main `
--equip FD_FAN `
--artifact-root artifacts `
--enable-report `
--mode file
# Expected behavior:
# - Reads CSV from data/
# - Writes CSV to artifacts/FD_FAN/run_*/
# - ALSO writes to SQL tables (ScoresTS, AnomalyEvents, etc.)
# - Console shows "[DUAL] SQL write succeeded" messages
Step 1.4: Validate SQL Data ⏳
-- Check that data was written to SQL
SELECT 'ScoresTS' as TableName, COUNT(*) as Rows FROM ScoresTS
UNION ALL SELECT 'AnomalyEvents', COUNT(*) FROM AnomalyEvents
UNION ALL SELECT 'DriftTS', COUNT(*) FROM DriftTS
UNION ALL SELECT 'PCA_Model', COUNT(*) FROM PCA_Model
UNION ALL SELECT 'Runs', COUNT(*) FROM Runs;
-- Should show thousands of rows in ScoresTS, dozens in other tables
Step 1.5: Compare File vs SQL Outputs ⏳
Create validation script: scripts/sql/validate_dual_write.py
# Pseudo-code
import pandas as pd
import pyodbc
# Load file output
file_scores = pd.read_csv("artifacts/FD_FAN/run_*/scores.csv")
# Load SQL output
conn = pyodbc.connect(...)
sql_scores = pd.read_sql("SELECT * FROM ScoresTS WHERE EquipID=1", conn)
# Compare
assert file_scores.shape[0] == sql_scores.shape[0]
assert file_scores['fused_z'].mean() == sql_scores['fused_z'].mean()
# etc...
Objective: Store trained models in ModelRegistry table instead of .joblib files
Duration: 1 week
Risk: Low (models can still serialize to JSON)
Step 2.1: Enhance ModelVersionManager ⏳
# In core/model_persistence.py
class ModelVersionManager:
def save_to_sql(self, sql_client, equip_id, run_id, models_dict):
"""
Persist models to SQL ModelRegistry table.
Args:
sql_client: Active SQL connection
equip_id: Equipment ID
run_id: Current run UUID
models_dict: Dict with 'ar1', 'pca', 'iforest', 'gmm', 'regimes'
"""
for model_type, model_obj in models_dict.items():
params_json = self._serialize_model(model_obj)
stats_json = self._extract_stats(model_obj)
cur = sql_client.cursor()
cur.execute("""
INSERT INTO ModelRegistry
(ModelType, EquipID, Version, ParamsJSON, StatsJSON, RunID, EntryDateTime)
VALUES (?, ?, ?, ?, ?, ?, GETUTCDATE())
""", (model_type, equip_id, version, params_json, stats_json, run_id))
sql_client.conn.commit()
def load_from_sql(self, sql_client, equip_id, model_type, version=None):
"""Load latest (or specific version) model from SQL."""
# Query ModelRegistry and deserialize JSON → model object
pass
Step 2.2: Update acm_main.py ⏳
# After model training, save to SQL
if sql_client and cfg.get('output', {}).get('persist_models_sql', False):
model_mgr.save_to_sql(sql_client, equip_id, run_id, {
'ar1': ar1_models,
'pca': pca_model,
'iforest': iforest_model,
'gmm': gmm_model,
'regimes': regime_kmeans
})
Step 2.3: Test Model Round-Trip ⏳
# scripts/sql/test_model_persistence.py
# 1. Train models
# 2. Save to SQL
# 3. Load from SQL
# 4. Compare predictions (should match exactly)
ModelRegistry tableObjective: Disable file outputs, use SQL as primary storage
Duration: 2-3 weeks
Risk: Medium (requires historian integration)
Data Source: XStudio_Historian (live data)
Output: SQL only (no file artifacts)
Step 3.1: Historian Integration ⏳
# Already implemented in core/historian.py
# Wire into acm_main.py data loading section
if SQL_MODE and not dual_mode:
# Load from historian instead of CSV
from core.historian import HistorianClient
hist_client = HistorianClient.from_ini('xstudio_historian')
df_train, df_score = hist_client.fetch_equipment_tags_for_acm(
equip_code=equip_code,
start_time=train_start,
end_time=score_end
)
Step 3.2: Disable File Writes ⏳
# In core/output_manager.py
if SQL_MODE and not dual_mode:
# Skip all file writes
Console.info("[SQL] File writes disabled in SQL-only mode")
return
Step 3.3: Equipment Scheduler ⏳
Create scripts/run_all_equipment.py:
# Loop through all active equipment
# For each:
# 1. Query latest run timestamp from Runs table
# 2. Fetch new data from historian since last run
# 3. Execute pipeline
# 4. Write results to SQL
# 5. Update Runs table with completion status
Step 3.4: Production Deployment ⏳
Runs table and performance logsDay 1-2:
dual_mode: trueEquipment tableDay 3-4:
5. Create validation script (validate_dual_write.py)
6. Compare file vs SQL outputs (row counts, statistics, keys)
7. Measure performance baseline
8. Fix any schema mismatches or write errors
Day 5: 9. Run 10 dual-write cycles on multiple equipment 10. Document validation results 11. Tune SQL batch writer performance
Day 6-8:
12. Implement save_to_sql() in ModelVersionManager
13. Implement load_from_sql() with version selection
14. Test model round-trip (save → load → predict)
Day 9-10:
15. Integrate model persistence into main pipeline
16. Run 5 training cycles, verify models in ModelRegistry
17. Test cold-start with SQL-loaded models
Day 11-15:
18. Review historian integration code (core/historian.py)
19. Plan equipment scheduler architecture
20. Design monitoring dashboard queries
21. Plan production deployment (scheduler, alerts, backups)
dual_mode flag, writes to both destinationsfast_executemany enabled (10x speedup)Equipment tablePriority (highest to lowest):
ConfigLog table (runtime overrides)40_seed_config.sql)config_table.csv (legacy fallback)config.yaml (base defaults)dt_local (datetime2) - local plant timeVersion parametertests/test_dual_write.py - Dual-write logictests/test_model_persistence_sql.py - Model save/loadtests/test_sql_client.py - Connection handlingscripts/sql/validate_dual_write.py - File vs SQL comparisonscripts/sql/test_model_roundtrip.py - Model serializationscripts/sql/test_historian_integration.py - Live data fetchIf SQL integration fails at any phase:
# Disable dual-write
$env:ACM_DUAL_MODE = "false"
# Or remove from config.yaml
Impact: Zero (file mode still works)
# In acm_main.py, disable SQL model persistence
persist_models_sql: false
Impact: Models saved to .joblib files instead
# Re-enable file mode, disable SQL-only
python -m core.acm_main --mode file --equip FD_FAN
Impact: Returns to CSV file processing
# Edit config.yaml
output:
dual_mode: true
# Register equipment
sqlcmd -S localhost\B19CL3PCQLSERVER -E -d ACM -Q "
INSERT INTO Equipment (EquipCode, EquipName, Area, Unit, Status)
VALUES ('FD_FAN', 'Forced Draft Fan', 'Boiler', 'Unit 1', 1)"
# Run pipeline
python -m core.acm_main --equip FD_FAN --artifact-root artifacts --enable-report
-- Check SQL data
SELECT COUNT(*) FROM ScoresTS;
SELECT COUNT(*) FROM AnomalyEvents;
SELECT * FROM Runs ORDER BY StartTimeUTC DESC;
save_to_sql() in ModelVersionManagerconfigs/
sql_connection.ini # Multi-database connections
config.yaml # Legacy fallback (kept)
core/
sql_client.py # Enhanced for multi-DB
historian.py # NEW - Historian client
acm_main.py # Modified _load_config()
data_io.py # SQL writers (already exist)
utils/
sql_config.py # NEW - SQL config reader/writer
scripts/sql/
40_seed_config.sql # NEW - Config seeding
test_config_load.py # NEW - Test script
(other SQL scripts) # Already exist from previous work
docs/sql/
XHS_*.sql # Historian SPs (for reference)
ACM/
├── configs/
│ ├── sql_connection.ini Local SQL connection (Windows Auth)
│ ├── config.yaml Base config with dual_mode flag
│ └── config_table.csv Legacy config fallback
│
├── core/
│ ├── acm_main.py Main pipeline (dual-write ready, line 658)
│ ├── sql_client.py SQL connection manager (Windows Auth)
│ ├── output_manager.py Dual-write coordinator (4547 lines)
│ ├── sql_performance.py Batch writer + performance monitor
│ ├── model_persistence.py Model versioning (ready for SQL)
│ └── historian.py Historian client (Phase 3)
│
├── utils/
│ ├── sql_config.py SQL config reader/writer
│ └── logger.py Console logging
│
├── scripts/sql/
│ ├── 00_create_database.sql Create ACM database
│ ├── 10_core_tables.sql 21 tables (Equipment, Runs, etc.)
│ ├── 15_config_tables.sql Config + audit tables
│ ├── 20_stored_procs.sql 19 write procedures
│ ├── 25_equipment_discovery_procs.sql DOW integration
│ ├── 30_views.sql 5 analytical views
│ ├── 40_seed_config.sql Config seeding script
│ ├── verify_acm_connection.py Connection test script
│ ├── test_config_load.py Config validation script
│ ├── validate_dual_write.py ⏳ TO CREATE (Phase 1)
│ └── test_model_persistence.py ⏳ TO CREATE (Phase 2)
│
├── data/ CSV input files (Phase 1 data source)
│ ├── FD FAN TRAINING DATA.csv
│ └── Gas Turbine Training Data Set.csv
│
└── artifacts/ File outputs (dual-write destination)
└── {EQUIP}/
├── run_{timestamp}/ CSV/JSON/PNG outputs
└── models/ .joblib model cache
What works NOW:
ACM_Config table)What's preserved:
What's next (IMMEDIATE ACTIONS):
output.dual_mode: true)END OF REVISED SQL INTEGRATION PLAN